TransactionSynchronizationManager

您所在的位置:网站首页 no value for bottom TransactionSynchronizationManager

TransactionSynchronizationManager

2023-04-09 10:56| 来源: 网络整理| 查看: 265

欢饮大家关注个人公众号:

最近在搞一个功能点,是执行刷新ES的操作,前提是之前的操作DB的动作都完成了并事务提交了之后才能执行刷ES的的动作,TransactionSynchronizationManager这个类就派上用场了,这个类很多地方都有用到,比如我们连接数据库的连接等等,下面我们来探讨下TransactionSynchronizationManager这个真实的面目是怎样的。 概念TransactionSynchronizationManager是事务同步管理器,监听事务的操作,来实现在事务前后可以添加一些指定操作. 使用使用起来也很简单,几行代码就搞定,只需要注册一个TransactionSynchronization实例就可以了,但是我们一般都是注册它的设配器:TransactionSynchronizationAdapter。

public void registerSynchronization(Consumer afterCommit, String uuid) { // 这里主要是判断是否开启了事务,如果没有开启事务是会报错的 // 所以这里判断是否开启了事务,如果没有开启事务则直接执行方法 boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive(); if (!actualTransactionActive) { LOGGER.info("uuid: {}, 当前线程: {}, 没有激活事务, 直接执行 afterCommit.", uuid, Thread.currentThread().getName()); afterCommit.accept(uuid); return; } // 如果开始了事务则在这里注册一个同步事务,将监听当前线程事务的动作 LOGGER.info("uuid: {}, 当前线程: {}, 激活事务, 注册事务提交后的回调 afterCommit.", uuid, Thread.currentThread().getName()); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { // 调用父类的事务提交方法 super.afterCommit(); LOGGER.info("uuid: {}, 当前线程: {}, 开始执行事务提交后的回调 afterCommit.", uuid, Thread.currentThread().getName()); // 事务提交之后,则执行我们的目标方法 afterCommit.accept(uuid); } }); }

我们也可以封装一个方法来调用,实现异步线程去事务提交后方法,如下面封装,下面这种封装,都会是相同的一个事务注册同步类,因为放到了spring ioc容器去管理了,当前线程事务被提交,则会执行RUNNABLES ,执行完之后,在afterCompletion方法会移除掉任务,意思就是RUNNABLES 的任务执行都是在同一个事务提交之后执行,如果想做到一个事务一个任务去执行,则每次调用这个方法,新注册一个新的事务同步器,就上面的那个例子,每次调用都会新注册一个新的事务同步器,这样子执行的结果互相隔离,互补干扰:

@Component public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class); // 保存要运行的任务线程 private static final ThreadLocal RUNNABLES = new ThreadLocal(); // 设置线程池 private ExecutorService threadPool = Executors.newFixedThreadPool(5); @Override public void execute(Runnable runnable) { LOGGER.info("Submitting new runnable {} to run after commit", runnable); // 如果没有开启事务,则执行运行 if (!TransactionSynchronizationManager.isSynchronizationActive()) { LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable); threadPool.execute(runnable); return; } // 开启了事务,则判断是否初始化,没有初始化则初始化,并注册 List threadRunnables = RUNNABLES.get(); if (threadRunnables == null) { threadRunnables = new ArrayList(); RUNNABLES.set(threadRunnables); TransactionSynchronizationManager.registerSynchronization(this); } threadRunnables.add(runnable); } // 监听到事务提交之后执行方法 @Override public void afterCommit() { List threadRunnables = RUNNABLES.get(); LOGGER.info("Transaction successfully committed, executing {} runnables", threadRunnables.size()); // 循环遍历执行任务 for (int i = 0; i < threadRunnables.size(); i++) { Runnable runnable = threadRunnables.get(i); LOGGER.info("Executing runnable {}", runnable); try { threadPool.execute(runnable); } catch (RuntimeException e) { LOGGER.error("Failed to execute runnable " + runnable, e); } } } // 判断 @Override public void afterCompletion(int status) { LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK"); RUNNABLES.remove(); } } TransactionSynchronizationManager 源码探讨

上面的使用比较简单,接下来我们看看源码:

TransactionSynchronizationManager的结构:

public abstract class TransactionSynchronizationManager { //线程上下文中保存着【线程池对象:ConnectionHolder】的Map对象。线程可以通过该属性获取到同一个Connection对象。 private static final ThreadLocal resources = new NamedThreadLocal("Transactional resources"); //事务同步器,是Spring交由程序员进行扩展的代码,每个线程可以注册N个事务同步器。 private static final ThreadLocal synchronizations = new NamedThreadLocal("Transaction synchronizations"); // 事务的名称 private static final ThreadLocal currentTransactionName = new NamedThreadLocal("Current transaction name"); // 事务是否是只读 private static final ThreadLocal currentTransactionReadOnly = new NamedThreadLocal("Current transaction read-only status"); // 事务的隔离级别 private static final ThreadLocal currentTransactionIsolationLevel = new NamedThreadLocal("Current transaction isolation level"); // 事务是否开启 actual:真实的 private static final ThreadLocal actualTransactionActive = new NamedThreadLocal("Actual transaction active"); }

上面是它的属性,下面我们看看它的方法:

// 获取连接map public static Map getResourceMap() { Map map = (Map)resources.get(); return map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap(); } public static boolean hasResource(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); return value != null; } // 指定key值获取连接 @Nullable public static Object getResource(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; } @Nullable private static Object doGetResource(Object actualKey) { Map map = (Map)resources.get(); if (map == null) { return null; } else { Object value = map.get(actualKey); if (value instanceof ResourceHolder && ((ResourceHolder)value).isVoid()) { map.remove(actualKey); if (map.isEmpty()) { resources.remove(); } value = null; } return value; } } // 将连接信息绑定到当前key public static void bindResource(Object key, Object value) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map map = (Map)resources.get(); if (map == null) { map = new HashMap(); resources.set(map); } Object oldValue = ((Map)map).put(actualKey, value); if (oldValue instanceof ResourceHolder && ((ResourceHolder)oldValue).isVoid()) { oldValue = null; } if (oldValue != null) { throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } else { if (logger.isTraceEnabled()) { logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]"); } } } // 解绑当前线程指定key的连接 public static Object unbindResource(Object key) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doUnbindResource(actualKey); if (value == null) { throw new IllegalStateException("No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } else { return value; } } @Nullable public static Object unbindResourceIfPossible(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); return doUnbindResource(actualKey); } @Nullable private static Object doUnbindResource(Object actualKey) { Map map = (Map)resources.get(); if (map == null) { return null; } else { Object value = map.remove(actualKey); if (map.isEmpty()) { resources.remove(); } if (value instanceof ResourceHolder && ((ResourceHolder)value).isVoid()) { value = null; } if (value != null && logger.isTraceEnabled()) { logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" + Thread.currentThread().getName() + "]"); } return value; } } //当前线程的事务同步是否处于活动状态。 //在注册之前调用,以避免不必要的实例创建 public static boolean isSynchronizationActive() { return synchronizations.get() != null; } //初始化事务同步器,如果当前事务同步器存在则抛异常 public static void initSynchronization() throws IllegalStateException { if (isSynchronizationActive()) { throw new IllegalStateException("Cannot activate transaction synchronization - already active"); } else { logger.trace("Initializing transaction synchronization"); synchronizations.set(new LinkedHashSet()); } } //注册同步器 ,前提是当前线程的事务同步是否处于活动状态,否则会报错 public static void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null"); if (!isSynchronizationActive()) { throw new IllegalStateException("Transaction synchronization is not active"); } else { ((Set)synchronizations.get()).add(synchronization); } } // 获取所有事务同步器,返回是不可以修改集合 public static List getSynchronizations() throws IllegalStateException { Set synchs = (Set)synchronizations.get(); if (synchs == null) { throw new IllegalStateException("Transaction synchronization is not active"); } else if (synchs.isEmpty()) { return Collections.emptyList(); } else { List sortedSynchs = new ArrayList(synchs); AnnotationAwareOrderComparator.sort(sortedSynchs); return Collections.unmodifiableList(sortedSynchs); } } // 清除所有事务同步器 public static void clearSynchronization() throws IllegalStateException { if (!isSynchronizationActive()) { throw new IllegalStateException("Cannot deactivate transaction synchronization - not active"); } else { logger.trace("Clearing transaction synchronization"); synchronizations.remove(); } } // 设置当前事务名称 public static void setCurrentTransactionName(@Nullable String name) { currentTransactionName.set(name); } // 获取当前事务名称 @Nullable public static String getCurrentTransactionName() { return (String)currentTransactionName.get(); } // 设置当前事务是否只读 public static void setCurrentTransactionReadOnly(boolean readOnly) { currentTransactionReadOnly.set(readOnly ? Boolean.TRUE : null); } // 判断当前会务是否只读 public static boolean isCurrentTransactionReadOnly() { return currentTransactionReadOnly.get() != null; } // 设置事务隔离级别 public static void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) { currentTransactionIsolationLevel.set(isolationLevel); } // 获取事务隔离级别 @Nullable public static Integer getCurrentTransactionIsolationLevel() { return (Integer)currentTransactionIsolationLevel.get(); } // 设置事务状态是否开启 public static void setActualTransactionActive(boolean active) { actualTransactionActive.set(active ? Boolean.TRUE : null); } // 判断事务是否开启 public static boolean isActualTransactionActive() { return actualTransactionActive.get() != null; } // 清除事务同步器 public static void clear() { synchronizations.remove(); currentTransactionName.remove(); currentTransactionReadOnly.remove(); currentTransactionIsolationLevel.remove(); actualTransactionActive.remove(); }

上面就是事务同步器的所有方法,理解起来很简单,只是简单的增删,判断等,上面有一个设置事务的隔离级别,这里提下,帮助复习下隔离级别有哪些:

1.READ_UNCOMMITTED 读未提交 2.READ_COMMITTED 读已提交 4.REPEATABLE_READ 可重复读 8.SERIALIZABLE 序列化 TransactionSynchronization源码探讨

这个类是程序员对事务同步的扩展点:用于事务同步回调的接口:

// 正常提交状态 int STATUS_COMMITTED = 0; // 回滚状态 int STATUS_ROLLED_BACK = 1; // 不明状态 int STATUS_UNKNOWN = 2; // 事务挂起 default void suspend() { } // 事务恢复 default void resume() { } // 将基础会话刷新到数据存储区(如果适用),比如Hibernate/JPA的Session default void flush() { } // 在事务提交前触发,此处若发生异常,会导致回滚 default void beforeCommit(boolean readOnly) { } // 在beforeCommit之后,commit/rollback之前执行。即使异常,也不会回滚。 default void beforeCompletion() { } // 事务提交后执行。 default void afterCommit() { } // 事务提交/回滚执行 default void afterCompletion(int status) { }

一般而言,我们在TransactionSynchronization使用最多的是afterCommit和afterCompletion方法。可以在事务执行完毕之后,直接调用afterCommit()方法进行异步通知.afterCommit 这个方法可通过入参status进行判断当前事务处于什么状态来执行相应的逻辑。还有一个适配器:TransactionSynchronizationAdapter 这个实现了Ordered类,多了一个可以设置执行顺序的功能,其他的功能方法跟TransactionSynchronization一样。

应用:

sql的数据库连接跟当前线程绑定,也是用到这事务管理器.在SpringCache的自定义CacheManager中。装饰Cache对象使其支持事务操作。即只有在事务提交成功之后,才会进行缓存.这个也是运用了事务管理器.

今天记录到此,我们下期再见!

你知道的越多,你不知道的越多!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3